Python 中连接 MSSQL/MySQL/SQLite/Redis/ElasticSearch/Mongodb/PostgreSQL/Oracle/RabbitMQ

在 Python 中连接MSSQL,MySQL,SQLite,Redis,ElasticSearch,Mongodb,PostgreSQL,Oracle,RabbitMQ等数据库/中间件的方式

Python 中连接 MSSQL/MySQL/SQLite/Redis/ElasticSearch/Mongodb/PostgreSQL/Oracle/RabbitMQ

Python 中连接 MSSQL 数据库

要在 Python 中连接 MSSQL 数据库,可以使用 pyodbc 模块。以下是一个简单的示例:

首先,需要安装 pyodbc 模块。可以使用 pip 命令进行安装:

“`
pip install pyodbc
“`

然后,可以使用以下代码连接到 MSSQL 数据库:

“`python
import pyodbc

# 建立连接
conn = pyodbc.connect(‘DRIVER={SQL Server};’
‘SERVER=server_name;’
‘DATABASE=database_name;’
‘UID=username;’
‘PWD=password’)

# 创建游标
cursor = conn.cursor()

# 执行查询
cursor.execute(‘SELECT * FROM table_name’)

# 获取查询结果
for row in cursor:
print(row)

# 关闭连接
conn.close()
“`

在上面的代码中,将 `server_name` 替换为 MSSQL 服务器的名称,将 `database_name` 替换为要连接的数据库的名称,将 `username` 和 `password` 替换为登录 MSSQL 的用户名和密码。

接下来,使用 `pyodbc.connect()` 方法建立连接,该方法需要指定连接字符串。在连接字符串中,使用 `DRIVER={SQL Server}` 指定要使用的驱动程序,`SERVER` 指定服务器名称,`DATABASE` 指定要连接的数据库名称,`UID` 指定登录的用户名,`PWD` 指定登录的密码。

然后,使用 `conn.cursor()` 方法创建游标,使用 `cursor.execute()` 方法执行查询语句,使用 `for` 循环遍历查询结果,最后使用 `conn.close()` 方法关闭连接。

 

Python 中连接 MySQL 数据库

 

要在 Python 中连接 MySQL 数据库,可以使用 PyMySQL 模块。以下是一个简单的示例:

首先,需要安装 PyMySQL 模块。可以使用 pip 命令进行安装:

“`
pip install pymysql
“`

然后,可以使用以下代码连接到 MySQL 数据库:

“`python
import pymysql

# 建立连接
conn = pymysql.connect(host=’localhost’, port=3306, user=’username’, password=’password’, database=’database_name’)

# 创建游标
cursor = conn.cursor()

# 执行查询
cursor.execute(‘SELECT * FROM table_name’)

# 获取查询结果
for row in cursor:
print(row)

# 关闭连接
conn.close()
“`

在上面的代码中,将 `host` 替换为 MySQL 服务器的地址,将 `port` 替换为 MySQL 服务器的端口号,将 `username` 和 `password` 替换为登录 MySQL 的用户名和密码,将 `database_name` 替换为要连接的数据库的名称。

接下来,使用 `pymysql.connect()` 方法建立连接,该方法需要指定连接参数。在连接参数中,使用 `host` 指定 MySQL 服务器的地址,`port` 指定 MySQL 服务器的端口号,`user` 指定登录的用户名,`password` 指定登录的密码,`database` 指定要连接的数据库名称。

然后,使用 `conn.cursor()` 方法创建游标,使用 `cursor.execute()` 方法执行查询语句,使用 `for` 循环遍历查询结果,最后使用 `conn.close()` 方法关闭连接。

 

Python 中连接 SQLite 数据库

要在 Python 中连接 SQLite 数据库,可以使用内置的 sqlite3 模块。以下是一个简单的示例:

首先,需要使用以下代码建立连接:

“`python
import sqlite3

# 建立连接
conn = sqlite3.connect(‘my_database.db’)

# 创建游标对象
cursor = conn.cursor()
“`

在上面的代码中,使用 `sqlite3.connect()` 方法建立连接。该方法需要指定数据库文件的路径。如果该文件不存在,则会自动创建一个新的数据库文件。如果路径为 `:memory:`,则表示在内存中创建一个临时数据库。

接下来,使用 `cursor()` 方法创建游标对象,该对象用于执行 SQL 查询语句。

例如,可以使用以下代码创建一个名为 `users` 的表:

“`python
# 创建表格
cursor.execute(”’
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT
)
”’)
“`

在上面的代码中,使用 `execute()` 方法执行 SQL 查询语句。在该示例中,创建了一个包含 `id`、`name` 和 `email` 三个字段的表格。

接下来,可以使用以下代码插入一些数据:

“`python
# 插入数据
cursor.execute(“INSERT INTO users (name, email) VALUES (?, ?)”, (‘Alice’, ‘alice@example.com’))
cursor.execute(“INSERT INTO users (name, email) VALUES (?, ?)”, (‘Bob’, ‘bob@example.com’))
cursor.execute(“INSERT INTO users (name, email) VALUES (?, ?)”, (‘Charlie’, ‘charlie@example.com’))

# 提交更改
conn.commit()
“`

在上面的代码中,使用 `execute()` 方法执行 SQL 查询语句。在该示例中,插入了三条数据,每条数据包含 `name` 和 `email` 两个字段的值。在插入数据后,使用 `commit()` 方法提交更改。

接下来,可以使用以下代码查询数据:

“`python
# 查询数据
cursor.execute(“SELECT * FROM users”)
rows = cursor.fetchall()

# 打印结果
for row in rows:
print(row)
“`

在上面的代码中,使用 `execute()` 方法执行 SQL 查询语句。在该示例中,查询了 `users` 表中的所有数据,并使用 `fetchall()` 方法获取查询结果。最后,使用循环遍历查询结果,并打印每行数据。

最后,可以使用以下代码关闭连接:

“`python
# 关闭连接
cursor.close()
conn.close()
“`

在上面的代码中,使用 `close()` 方法关闭游标对象和数据库连接对象。

除了以上的连接方法外,还可以使用 SQLAlchemy 模块来连接 SQLite 数据库。SQLAlchemy 是一个 Python 的 SQL 工具包,支持多种 SQL 数据库,包括 SQLite。以下是一个使用 SQLAlchemy 的示例:

“`python
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 建立连接
engine = create_engine(‘sqlite:///my_database.db’, echo=True)

# 创建 Session 类
Session = sessionmaker(bind=engine)

# 创建 Base 类
Base = declarative_base()

# 创建 User 类
class User(Base):
__tablename__ = ‘users’

id = Column(Integer, primary_key=True)
name = Column(String)
email = Column(String)

# 创建表格
Base.metadata.create_all(engine)

# 创建 Session 对象
session = Session()

# 插入数据
session.add_all([
User(name=’Alice’, email=’alice@example.com’),
User(name=’Bob’, email=’bob@example.com’),
User(name=’Charlie’, email=’charlie@example.com’)
])
session.commit()

# 查询数据
users = session.query(User).all()
for user in users:
print(user.id, user.name, user.email)

# 关闭 Session 对象
session.close()
“`

在上面的代码中,使用 `create_engine()` 方法建立连接。该方法需要指定数据库文件的路径。在该示例中,使用了 `echo=True` 参数,表示在控制台输出 SQL 查询语句,方便调试。

接下来,使用 `sessionmaker()` 方法创建 Session 类,该类用于创建 Session 对象。使用 `declarative_base()` 方法创建 Base 类,该类用于定义数据模型。

在该示例中,定义了一个 `User` 类,该类继承了 `Base` 类,并定义了 `id`

 

Python 中连接 Redis

要在 Python 中连接 Redis 数据库,可以使用 redis 模块。以下是一个简单的示例:

首先,需要安装 redis 模块。可以使用 pip 命令进行安装:

“`
pip install redis
“`

然后,可以使用以下代码连接到 Redis 数据库:

“`python
import redis

# 建立连接
r = redis.Redis(host=’localhost’, port=6379, db=0)

# 设置键值对
r.set(‘key’, ‘value’)

# 获取键值对
value = r.get(‘key’)
print(value)

# 关闭连接
r.close()
“`

在上面的代码中,将 `host` 替换为 Redis 服务器的地址,将 `port` 替换为 Redis 服务器的端口号,将 `db` 替换为要连接的数据库的编号。

接下来,使用 `redis.Redis()` 方法建立连接,该方法需要指定连接参数。在连接参数中,使用 `host` 指定 Redis 服务器的地址,`port` 指定 Redis 服务器的端口号,`db` 指定要连接的数据库的编号。

然后,使用 `r.set()` 方法设置键值对,使用 `r.get()` 方法获取键值对,最后使用 `r.close()` 方法关闭连接。

除了以上的连接方法外,还可以使用 Redis 的连接池来提高连接的效率。连接池可以在多个线程之间共享连接,从而减少连接的开销。以下是一个使用连接池的示例:

“`python
import redis
from redis import ConnectionPool

# 建立连接池
pool = ConnectionPool(host=’localhost’, port=6379, db=0)

# 获取连接
r = redis.Redis(connection_pool=pool)

# 设置键值对
r.set(‘key’, ‘value’)

# 获取键值对
value = r.get(‘key’)
print(value)

# 关闭连接
r.close()
“`

在上面的代码中,使用 `redis.ConnectionPool()` 方法创建连接池,然后使用 `redis.Redis()` 方法获取连接,指定 `connection_pool` 参数为连接池。其他操作与前面的示例相同。

使用连接池可以提高连接的效率,但需要注意连接池的大小,以免占用过多的资源。可以使用 `max_connections` 参数来指定连接池的大小。

 

Python 中连接 ElasticSearch

 

要在 Python 中连接 ElasticSearch 数据库,可以使用 elasticsearch 模块。以下是一个简单的示例:

首先,需要安装 elasticsearch 模块。可以使用 pip 命令进行安装:

“`
pip install elasticsearch
“`

然后,可以使用以下代码连接到 ElasticSearch 数据库:

“`python
from elasticsearch import Elasticsearch

# 建立连接
es = Elasticsearch([‘localhost:9200′])

# 创建索引
es.indices.create(index=’my_index’)

# 添加文档
doc = {‘title’: ‘Elasticsearch Tutorial’, ‘content’: ‘This is a tutorial for Elasticsearch‘}
es.index(index=’my_index’, id=1, body=doc)

# 搜索文档
res = es.search(index=’my_index’, body={‘query’: {‘match’: {‘content’: ‘tutorial’}}})
print(res)

# 关闭连接
es.close()
“`

在上面的代码中,将 `localhost:9200` 替换为 ElasticSearch 服务器的地址和端口号。

接下来,使用 `Elasticsearch()` 方法建立连接,该方法需要指定连接参数。在连接参数中,使用一个包含 ElasticSearch 服务器地址的列表。

然后,使用 `es.indices.create()` 方法创建索引,使用 `es.index()` 方法添加文档,使用 `es.search()` 方法搜索文档,最后使用 `es.close()` 方法关闭连接。

除了以上的连接方法外,还可以使用 ElasticSearch 的连接池来提高连接的效率。连接池可以在多个线程之间共享连接,从而减少连接的开销。以下是一个使用连接池的示例:

“`python
from elasticsearch import Elasticsearch
from elasticsearch import ConnectionPool, ConnectionSelector

# 创建连接池
pool = ConnectionPool([{‘host’: ‘localhost’, ‘port’: 9200}], maxsize=10)

# 创建连接选择器
selector = ConnectionSelector([pool])

# 建立连接
es = Elasticsearch(connection_class=selector)

# 创建索引
es.indices.create(index=’my_index’)

# 添加文档
doc = {‘title’: ‘Elasticsearch Tutorial’, ‘content’: ‘This is a tutorial for Elasticsearch’}
es.index(index=’my_index’, id=1, body=doc)

# 搜索文档
res = es.search(index=’my_index’, body={‘query’: {‘match’: {‘content’: ‘tutorial’}}})
print(res)

# 关闭连接
es.close()
“`

在上面的代码中,使用 `elasticsearch.ConnectionPool()` 方法创建连接池,指定 ElasticSearch 服务器的地址和端口号。然后,使用 `elasticsearch.ConnectionSelector()` 方法创建连接选择器,将连接池传递给选择器。最后,使用 `Elasticsearch()` 方法建立连接,指定 `connection_class` 参数为连接选择器。

使用连接池可以提高连接的效率,但需要注意连接池的大小,以免占用过多的资源。可以使用 `maxsize` 参数来指定连接池的大小。

 

Python 中连接 MongoDB

要在 Python 中连接 MongoDB 数据库,可以使用 pymongo 模块。以下是一个简单的示例:

首先,需要安装 pymongo 模块。可以使用 pip 命令进行安装:

“`
pip install pymongo
“`

然后,可以使用以下代码连接到 MongoDB 数据库:

“`python
import pymongo

# 建立连接
client = pymongo.MongoClient(‘mongodb://localhost:27017/’)

# 创建数据库和集合
db = client[‘my_database’]
col = db[‘my_collection’]

# 插入文档
doc = {‘name’: ‘John’, ‘age’: 30}
col.insert_one(doc)

# 查询文档
res = col.find({‘name’: ‘John’})
for doc in res:
print(doc)

# 关闭连接
client.close()
“`

在上面的代码中,将 `mongodb://localhost:27017/` 替换为 MongoDB 服务器的地址和端口号。

接下来,使用 `pymongo.MongoClient()` 方法建立连接,该方法需要指定连接参数。在连接参数中,使用 MongoDB 服务器的地址和端口号。

然后,使用 `client[‘my_database’]` 方法创建数据库,使用 `db[‘my_collection’]` 方法创建集合。可以将 `my_database` 和 `my_collection` 替换为要创建的数据库和集合的名称。

接下来,使用 `col.insert_one()` 方法插入文档,使用 `col.find()` 方法查询文档,最后使用 `client.close()` 方法关闭连接。

除了以上的连接方法外,还可以使用 MongoDB 的连接池来提高连接的效率。连接池可以在多个线程之间共享连接,从而减少连接的开销。以下是一个使用连接池的示例:

“`python
import pymongo
from pymongo import MongoClient, uri_parser
from pymongo.pool import Pool, PooledConnection

class ConnectionPool(Pool):
def __init__(self, uri, *args, **kwargs):
super().__init__(*args, **kwargs)
self.uri = uri
self.client = MongoClient(uri, connect=False)

def create_connection(self):
return PooledConnection(self.client)

def get_uri(self):
return self.uri

# 建立连接池
uri = ‘mongodb://localhost:27017/my_database’
pool = ConnectionPool(uri, max_size=10)

# 建立连接
client = MongoClient(pool=pool)

# 创建数据库和集合
db = client.my_database
col = db.my_collection

# 插入文档
doc = {‘name’: ‘John’, ‘age’: 30}
col.insert_one(doc)

# 查询文档
res = col.find({‘name’: ‘John’})
for doc in res:
print(doc)

# 关闭连接
client.close()
“`

在上面的代码中,使用 `ConnectionPool` 类继承自 `pymongo.pool.Pool` 类,重写了 `create_connection()` 方法和 `get_uri()` 方法。在 `create_connection()` 方法中,使用 `PooledConnection` 类创建连接。在 `get_uri()` 方法中,返回 MongoDB 服务器的地址和端口号。

然后,使用 `ConnectionPool` 类创建连接池,指定 MongoDB 服务器的地址和端口号。使用 `max_size` 参数来指定连接池的大小。

最后,使用 `MongoClient(pool=pool)` 方法建立连接,指定 `pool` 参数为连接池。使用 `client.my_database` 和 `db.my_collection` 方法创建数据库和集合。可以将 `my_database` 和 `my_collection` 替换为要创建的数据库和集合的名称。

 

Python 中连接 PostgreSQL

要在 Python 中连接 PostgreSQL 数据库,可以使用 psycopg2 模块。以下是一个简单的示例:

首先,需要安装 psycopg2 模块。可以使用 pip 命令进行安装:

“`
pip install psycopg2
“`

然后,可以使用以下代码连接到 PostgreSQL 数据库:

“`python
import psycopg2

# 建立连接
conn = psycopg2.connect(host=’localhost’, port=5432, dbname=’my_database’, user=’my_username’, password=’my_password’)

# 创建游标
cur = conn.cursor()

# 创建表
cur.execute(‘CREATE TABLE my_table (id SERIAL PRIMARY KEY, name VARCHAR(50), age INTEGER)’)

# 插入数据
cur.execute(“INSERT INTO my_table (name, age) VALUES (%s, %s)”, (‘John’, 30))

# 查询数据
cur.execute(“SELECT * FROM my_table WHERE name = %s”, (‘John’,))
rows = cur.fetchall()
for row in rows:
print(row)

# 提交事务
conn.commit()

# 关闭游标和连接
cur.close()
conn.close()
“`

在上面的代码中,将 `localhost` 替换为 PostgreSQL 服务器的地址,将 `5432` 替换为 PostgreSQL 服务器的端口号,将 `my_database`、`my_username` 和 `my_password` 替换为要连接的数据库、用户名和密码。

接下来,使用 `psycopg2.connect()` 方法建立连接,该方法需要指定连接参数。在连接参数中,使用 PostgreSQL 服务器的地址、端口号、数据库、用户名和密码。

然后,使用 `conn.cursor()` 方法创建游标,使用 `cur.execute()` 方法执行 SQL 命令。在上面的示例中,使用 `CREATE TABLE` 命令创建表,使用 `INSERT INTO` 命令插入数据,使用 `SELECT` 命令查询数据。

最后,使用 `conn.commit()` 方法提交事务,使用 `cur.close()` 和 `conn.close()` 方法关闭游标和连接。

除了以上的连接方法外,还可以使用 SQLAlchemy 模块来连接 PostgreSQL 数据库。SQLAlchemy 是一个 Python 的 SQL 工具和 ORM(对象关系映射)框架,支持多种数据库,包括 PostgreSQL。以下是一个使用 SQLAlchemy 的示例:

“`python
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 建立连接
engine = create_engine(‘postgresql://my_username:my_password@localhost:5432/my_database’)

# 创建 Session
Session = sessionmaker(bind=engine)
session = Session()

# 定义模型
Base = declarative_base()

class MyTable(Base):
__tablename__ = ‘my_table’
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)

# 创建表
Base.metadata.create_all(engine)

# 插入数据
my_data = MyTable(name=’John’, age=30)
session.add(my_data)
session.commit()

# 查询数据
rows = session.query(MyTable).filter_by(name=’John’).all()
for row in rows:
print(row.id, row.name, row.age)

# 关闭 Session
session.close()
“`

在上面的代码中,将 `my_username` 和 `my_password` 替换为要连接的用户名和密码,将 `localhost` 替换为 PostgreSQL 服务器的地址,将 `5432` 替换为 PostgreSQL 服务器的端口号,将 `my_database` 替换为要连接的数据库。

接下来,使用 `create_engine()` 方法建立连接,该方法需要指定连接字符串。在连接字符串中,使用 PostgreSQL 服务器的地址、端口号、用户名、密码和数据库。

然后,使用 `sessionmaker()` 方法创建 Session,使用 `Session()` 方法创建一个 Session 对象。这里使用 SQLAlchemy 的 ORM 功能,定义一个模型 `MyTable`,该模型对应一个名为 `my_table` 的表。使用 `Base.metadata.create_all()` 方法创建表。

接下来,使用 `session.add()` 方法插入数据,使用 `session.query()` 方法查询数据。在上面的示例中,使用 `filter_by()` 方法过滤数据,使用 `all()` 方法获取所有结果。

最后,使用 `session.close()` 方法关闭 Session。

使用 SQLAlchemy 连接 PostgreSQL 数据库可以提供更方便的 ORM 功能,并且可以支持更多的数据库。

 

Python 中连接 Oracle

要在 Python 中连接 Oracle 数据库,可以使用 cx_Oracle 模块。以下是一个简单的示例:

首先,需要安装 cx_Oracle 模块。可以使用 pip 命令进行安装:

“`
pip install cx_Oracle
“`

然后,可以使用以下代码连接到 Oracle 数据库:

“`python
import cx_Oracle

# 建立连接
conn = cx_Oracle.connect(‘my_username/my_password@my_host:my_port/my_service_name’)

# 创建游标
cur = conn.cursor()

# 创建表
cur.execute(‘CREATE TABLE my_table (id NUMBER(10), name VARCHAR2(50), age NUMBER(3))’)

# 插入数据
cur.execute(“INSERT INTO my_table (id, name, age) VALUES (:1, :2, :3)”, (1, ‘John’, 30))

# 查询数据
cur.execute(“SELECT * FROM my_table WHERE name = :1”, (‘John’,))
rows = cur.fetchall()
for row in rows:
print(row)

# 提交事务
conn.commit()

# 关闭游标和连接
cur.close()
conn.close()
“`

在上面的代码中,将 `my_username`、`my_password`、`my_host`、`my_port` 和 `my_service_name` 替换为要连接的用户名、密码、主机名、端口号和服务名。

接下来,使用 `cx_Oracle.connect()` 方法建立连接,该方法需要指定连接字符串。在连接字符串中,使用 Oracle 数据库的用户名、密码、主机名、端口号和服务名。

然后,使用 `conn.cursor()` 方法创建游标,使用 `cur.execute()` 方法执行 SQL 命令。在上面的示例中,使用 `CREATE TABLE` 命令创建表,使用 `INSERT INTO` 命令插入数据,使用 `SELECT` 命令查询数据。

最后,使用 `conn.commit()` 方法提交事务,使用 `cur.close()` 和 `conn.close()` 方法关闭游标和连接。

除了以上的连接方法外,还可以使用 SQLAlchemy 模块来连接 Oracle 数据库。SQLAlchemy 是一个 Python 的 SQL 工具和 ORM(对象关系映射)框架,支持多种数据库,包括 Oracle。以下是一个使用 SQLAlchemy 的示例:

“`python
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 建立连接
engine = create_engine(‘oracle+cx_oracle://my_username:my_password@my_host:my_port/my_service_name’)

# 创建 Session
Session = sessionmaker(bind=engine)
session = Session()

# 定义模型
Base = declarative_base()

class MyTable(Base):
__tablename__ = ‘my_table’
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)

# 创建表
Base.metadata.create_all(engine)

# 插入数据
my_data = MyTable(id=1, name=’John’, age=30)
session.add(my_data)
session.commit()

# 查询数据
rows = session.query(MyTable).filter_by(name=’John’).all()
for row in rows:
print(row.id, row.name, row.age)

# 关闭 Session
session.close()
“`

在上面的代码中,将 `my_username`、`my_password`、`my_host`、`my_port` 和 `my_service_name` 替换为要连接的用户名、密码、主机名、端口号和服务名。

接下来,使用 `create_engine()` 方法建立连接,该方法需要指定连接字符串。在连接字符串中,使用 Oracle 数据库的用户名、密码、主机名、端口号和服务名。

然后,使用 `sessionmaker()` 方法创建 Session,使用 `Session()` 方法创建一个 Session 对象。这里使用 SQLAlchemy 的 ORM 功能,定义一个模型 `MyTable`,该模型对应一个名为 `my_table` 的表。

使用 `Base.metadata.create_all()` 方法创建表。在上面的示例中,使用 `add()` 方法插入数据,使用 `query()` 方法查询数据。

最后,使用 `session.close()` 方法关闭 Session。

 

Python 中连接 RabbitMQ

要在 Python 中连接 RabbitMQ,可以使用 pika 模块。以下是一个简单的示例:

首先,需要安装 pika 模块。可以使用 pip 命令进行安装:

“`
pip install pika
“`

然后,可以使用以下代码连接到 RabbitMQ

“`python
import pika

# 建立连接
credentials = pika.PlainCredentials(‘my_username’, ‘my_password’)
connection = pika.BlockingConnection(pika.ConnectionParameters(‘my_host’, 5672, ‘/’, credentials))
channel = connection.channel()

# 声明队列
channel.queue_declare(queue=’my_queue’)

# 发送消息
channel.basic_publish(exchange=”, routing_key=’my_queue’, body=’Hello, RabbitMQ!’)

# 接收消息
def callback(ch, method, properties, body):
print(“Received message:”, body)

channel.basic_consume(queue=’my_queue’, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

# 关闭连接
channel.close()
connection.close()
“`

在上面的代码中,将 `my_username`、`my_password`、`my_host` 替换为要连接的用户名、密码和主机名。

接下来,使用 `pika.BlockingConnection()` 方法建立连接,该方法需要指定连接参数。在连接参数中,使用 RabbitMQ 的主机名、端口号、虚拟主机名和凭证。

然后,使用 `channel.queue_declare()` 方法声明队列,使用 `channel.basic_publish()` 方法发送消息。

使用 `channel.basic_consume()` 方法接收消息,需要定义一个回调函数 `callback()`,接收消息时会调用该函数。在上面的示例中,定义的回调函数将收到的消息打印出来。

最后,使用 `channel.close()` 和 `connection.close()` 方法关闭连接。

除了以上的连接方法外,还可以使用 kombu 模块来连接 RabbitMQ。kombu 是一个 Python 的消息传递库,支持多种消息传递协议,包括 RabbitMQ。以下是一个使用 kombu 的示例:

“`python
from kombu import Connection, Exchange, Queue

# 建立连接
conn = Connection(‘amqp://my_username:my_password@my_host:5672//’)
channel = conn.channel()

# 定义 Exchange 和 Queue
exchange = Exchange(‘my_exchange’, type=’direct’)
queue = Queue(‘my_queue’, exchange, routing_key=’my_queue’)

# 发送消息
producer = conn.Producer(serializer=’json’)
producer.publish({‘message’: ‘Hello, RabbitMQ!’}, exchange=exchange, routing_key=’my_queue’)

# 接收消息
def callback(body, message):
print(“Received message:”, body)
message.ack()

with conn.Consumer(queue, callbacks=[callback]) as consumer:
while True:
conn.drain_events()

# 关闭连接
channel.close()
conn.close()
“`

在上面的代码中,将 `my_username`、`my_password`、`my_host` 替换为要连接的用户名、密码和主机名。

接下来,使用 `Connection()` 方法建立连接,该方法需要指定连接字符串。在连接字符串中,使用 RabbitMQ 的用户名、密码、主机名和端口号。

然后,使用 `Exchange()` 和 `Queue()` 方法定义 Exchange 和 Queue,使用 `Producer()` 方法发送消息。

使用 `Consumer()` 方法接收消息,需要定义一个回调函数 `callback()`,接收到消息时会调用该函数。在上面的示例中,定义的回调函数将收到的消息打印出来,并使用 `ack()` 方法确认消息已接收。

最后,使用 `channel.close()` 和 `conn.close()` 方法关闭连接。

 

© 版权声明

☆ END ☆
喜欢就点个赞吧
点赞0 分享
图片正在生成中,请稍后...